Java 8 থেকে Parallel Streams এবং অন্যান্য নতুন কনকারেন্সি ফিচার যোগ করা হয়েছে, যা ডেটা প্রসেসিং এবং মাল্টি-কোর প্রসেসরের সর্বোত্তম ব্যবহার নিশ্চিত করে।
Parallel Streams
Parallel Streams হলো Java Streams API এর একটি বৈশিষ্ট্য, যা একই স্ট্রিম ডেটা মাল্টিপল থ্রেডে প্রসেস করতে দেয়। এটি ডেটা প্রক্রিয়াকরণে স্বয়ংক্রিয়ভাবে প্যারালালিজম ব্যবহার করে।
Parallel Streams ব্যবহার করার মূল বৈশিষ্ট্য
- সারল্য: সাধারণ স্ট্রিমকে সহজেই
parallel()মেথড দিয়ে প্যারালাল করা যায়। - বহু-কোর প্রসেসর ব্যবহার: মাল্টি-কোর প্রসেসরের শক্তি ব্যবহার করে বড় ডেটাসেট দ্রুত প্রসেস করা।
- ForkJoinPool ব্যবহার: Parallel Streams অভ্যন্তরীণভাবে ForkJoinPool ব্যবহার করে প্যারালাল প্রসেসিং পরিচালনা করে।
Parallel Streams উদাহরণ
import java.util.Arrays;
import java.util.List;
public class ParallelStreamExample {
public static void main(String[] args) {
List<String> names = Arrays.asList("John", "Jane", "Jack", "Jill");
// Sequential Stream
System.out.println("Sequential Stream:");
names.stream().forEach(name -> {
System.out.println(Thread.currentThread().getName() + " - " + name);
});
// Parallel Stream
System.out.println("\nParallel Stream:");
names.parallelStream().forEach(name -> {
System.out.println(Thread.currentThread().getName() + " - " + name);
});
}
}
আউটপুট:Sequential Stream সব ডেটা মেইন থ্রেডে প্রসেস করে, আর Parallel Stream মাল্টিপল থ্রেডে প্রসেস করে।
Java 8 Concurrency ফিচার
Java 8 এ কনকারেন্সি ব্যবস্থার উন্নতির জন্য বেশ কিছু নতুন ফিচার এবং API অন্তর্ভুক্ত করা হয়েছে। প্রধানগুলো হলো:
১. CompletableFuture
CompletableFuture হলো Java 8 এ অ্যাসিনক্রোনাস টাস্ক পরিচালনার জন্য একটি API, যা মাল্টি-থ্রেডিংকে সহজ এবং কার্যকর করে।
উদাহরণ:
import java.util.concurrent.CompletableFuture;
public class CompletableFutureExample {
public static void main(String[] args) {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println("Running in: " + Thread.currentThread().getName());
});
// Wait for the future to complete
future.join();
}
}
কমন মেথড:
runAsync(): কোনো রিটার্ন ভ্যালু ছাড়াই টাস্ক চালু করে।supplyAsync(): একটি রিটার্ন ভ্যালু সহ টাস্ক চালু করে।thenApply(): আগের টাস্কের ফলাফল প্রসেস করে।
২. ForkJoinPool
Java 7 এ প্রবর্তিত হলেও, Java 8 এ ForkJoinPool parallel streams এবং CompletableFuture এর মাধ্যমে অধিক কার্যকর হয়েছে। এটি RecursiveTask এবং RecursiveAction ব্যবহার করে কাজ ভাগ করে।
উদাহরণ:
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.ForkJoinPool;
public class ForkJoinExample {
static class SumTask extends RecursiveTask<Integer> {
private final int[] numbers;
private final int start, end;
private static final int THRESHOLD = 5;
public SumTask(int[] numbers, int start, int end) {
this.numbers = numbers;
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
if ((end - start) <= THRESHOLD) {
int sum = 0;
for (int i = start; i < end; i++) {
sum += numbers[i];
}
return sum;
} else {
int mid = (start + end) / 2;
SumTask leftTask = new SumTask(numbers, start, mid);
SumTask rightTask = new SumTask(numbers, mid, end);
leftTask.fork();
int rightResult = rightTask.compute();
int leftResult = leftTask.join();
return leftResult + rightResult;
}
}
}
public static void main(String[] args) {
int[] numbers = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
ForkJoinPool pool = new ForkJoinPool();
SumTask task = new SumTask(numbers, 0, numbers.length);
int result = pool.invoke(task);
System.out.println("Sum: " + result);
}
}
Parallel Streams vs CompletableFuture
| বৈশিষ্ট্য | Parallel Streams | CompletableFuture |
|---|---|---|
| লক্ষ্য | ডেটা প্রসেসিং (streams API) | অ্যাসিনক্রোনাস টাস্ক ম্যানেজমেন্ট |
| বিল্ট-ইন Thread Pool | Common ForkJoinPool | Common ForkJoinPool বা কাস্টম থ্রেড পুল |
| ব্যবহার ক্ষেত্র | স্ট্রিম ডেটা প্যারালাল প্রসেসিং | অ্যাসিনক্রোনাস টাস্ক চেইনিং |
| কাস্টমাইজেশন | সীমিত | আরো নমনীয় |
Java 8 Concurrency-এর সুবিধা
- সারল্য: মাল্টি-থ্রেডিং সহজ এবং সংক্ষিপ্ত কোড।
- উন্নত পারফরম্যান্স: মাল্টি-কোর প্রসেসরের শক্তি ব্যবহার করে।
- ফিচার সমৃদ্ধ API: Parallel Streams, CompletableFuture, এবং Lambda Integration।
- Parallel Streams: বড় ডেটাসেট দ্রুত প্রসেস করতে কার্যকর।
- CompletableFuture: অ্যাসিনক্রোনাস টাস্ক ম্যানেজমেন্টে নমনীয়তা এবং শক্তি যোগ করে।
- Java 8 এর কনকারেন্সি ফিচারগুলো উন্নত কর্মক্ষমতা, কার্যক্ষমতা, এবং ডেভেলপমেন্ট অভিজ্ঞতা প্রদান করে।
Java 8 Parallel Stream API হলো একটি শক্তিশালী টুল যা parallelism ব্যবহার করে ডেটা প্রসেসিং সহজ এবং কার্যকরী করে। এটি মূলত বড় ডেটাসেট প্রক্রিয়াকরণে পারফরম্যান্স বৃদ্ধি করতে সাহায্য করে। Parallel Stream ব্যবহার করলে ফর্ক-জয়েন ফ্রেমওয়ার্ক (Fork-Join Framework) এর মাধ্যমে স্বয়ংক্রিয়ভাবে একাধিক থ্রেড ব্যবহার করা হয়।
Parallel Stream API এর ভূমিকা
- ডেটা প্রক্রিয়াকরণে গতি বৃদ্ধি:
- Parallel Stream একই ডেটাসেটের উপাদানগুলোকে সমান্তরালভাবে (parallelly) প্রক্রিয়াকরণ করে গতি বৃদ্ধি করে।
- মাল্টি-কোর প্রসেসরের ব্যবহার:
- Parallel Stream মাল্টি-কোর প্রসেসরের সম্পূর্ণ ক্ষমতা ব্যবহার করে।
- কোড সরলতা:
- মাল্টি-থ্রেডিং বা থ্রেড ম্যানেজমেন্টের প্রয়োজন ছাড়াই সমান্তরাল প্রসেসিং করা যায়।
- ফর্ক-জয়েন ফ্রেমওয়ার্ক ব্যবহার:
- এটি কাজগুলোকে ছোট ছোট সাবটাস্কে ভাগ করে সমান্তরালভাবে প্রক্রিয়াকরণ করে এবং তারপর রেজাল্টগুলো একত্রিত করে।
Parallel Stream API এর ব্যবহার
১. সাধারণ Stream vs Parallel Stream
Parallel Stream এর মাধ্যমে একাধিক থ্রেড ব্যবহার করা হয়, যেখানে সাধারণ Stream একটি থ্রেডে কাজ করে।
import java.util.stream.IntStream;
public class ParallelStreamExample {
public static void main(String[] args) {
System.out.println("Normal Stream:");
IntStream.range(1, 10).forEach(System.out::println);
System.out.println("\nParallel Stream:");
IntStream.range(1, 10).parallel().forEach(System.out::println);
}
}
আউটপুট:
- Normal Stream: 1 থেকে 9 পর্যন্ত সিরিয়াল আউটপুট।
- Parallel Stream: 1 থেকে 9 পর্যন্ত অনিয়মিত (non-deterministic) ক্রমে আউটপুট, কারণ এটি একাধিক থ্রেড ব্যবহার করে।
২. লিস্ট প্রসেসিং
import java.util.Arrays;
import java.util.List;
public class ListParallelStreamExample {
public static void main(String[] args) {
List<String> names = Arrays.asList("John", "Jane", "Tom", "Emily");
// Parallel Stream ব্যবহার
names.parallelStream()
.forEach(name -> System.out.println(Thread.currentThread().getName() + " processing " + name));
}
}
বৈশিষ্ট্য:
- প্রতিটি নাম একটি ভিন্ন থ্রেডে প্রক্রিয়াকরণ হতে পারে।
৩. ফিল্টার এবং ম্যাপ অপারেশন
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
public class ParallelStreamFilterExample {
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
List<Integer> evenNumbers = numbers.parallelStream()
.filter(n -> n % 2 == 0)
.map(n -> n * 2)
.collect(Collectors.toList());
System.out.println("Processed Even Numbers: " + evenNumbers);
}
}
Parallel Stream ব্যবহার করার সুবিধা
- পারফরম্যান্স বৃদ্ধি:
- বড় ডেটাসেটের ক্ষেত্রে প্রসেসিং টাইম উল্লেখযোগ্যভাবে কমে যায়।
- সহজ ইমপ্লিমেন্টেশন:
- মাল্টি-থ্রেডিং কোড না লিখেই সমান্তরাল প্রসেসিং করা যায়।
- অটোমেটিক থ্রেড ম্যানেজমেন্ট:
- ফর্ক-জয়েন পুল স্বয়ংক্রিয়ভাবে থ্রেড ম্যানেজ করে।
- মাল্টি-কোর প্রসেসর ব্যবহার:
- মাল্টি-কোর প্রসেসর থেকে সর্বাধিক পারফরম্যান্স পাওয়া যায়।
Parallel Stream ব্যবহার করার সীমাবদ্ধতা
- সিঙ্ক্রোনাইজেশন ঝুঁকি:
- শেয়ারড রিসোর্স বা মিউটেবল ডেটার ক্ষেত্রে ডেটা রেস হতে পারে।
- ছোট ডেটাসেটে পারফরম্যান্স কমে যায়:
- Parallel Stream ছোট ডেটাসেটের জন্য অপ্রয়োজনীয় থ্রেড ম্যানেজমেন্টের কারণে ধীর হতে পারে।
- অপটিমাইজেশনের অভাব:
- Parallel Stream সব ধরনের কাজের জন্য পারফরম্যান্স বাড়াবে না। কিছু নির্দিষ্ট কাজ সিরিয়াল স্ট্রিমে দ্রুত হতে পারে।
- ডিবাগিং কঠিন:
- একাধিক থ্রেডের কারণে Parallel Stream এর ডিবাগিং জটিল হতে পারে।
Parallel Stream এর Best Practices
- ছোট ডেটাসেট এড়িয়ে চলুন:
- Parallel Stream বড় ডেটাসেটের জন্য উপযোগী।
- I/O-বাউন্ড কাজের জন্য ব্যবহার করবেন না:
- CPU-বাউন্ড কাজের জন্য এটি বেশি কার্যকর। I/O-নির্ভর কাজ সিরিয়াল স্ট্রিমে ভালো হয়।
স্ট্রিম অর্ডারিং প্রয়োজন হলে সতর্ক থাকুন:
- Parallel Stream এর আউটপুট ক্রম অগত্যা অর্ডার মেনে চলবে না।
forEachOrdered()ব্যবহার করতে পারেন যদি অর্ডার প্রয়োজন হয়।
IntStream.range(1, 10).parallel().forEachOrdered(System.out::println);- Parallel Stream এর আউটপুট ক্রম অগত্যা অর্ডার মেনে চলবে না।
- মিউটেবল অবজেক্ট ব্যবহার করবেন না:
- Immutable বা Thread-Safe ডেটা স্ট্রাকচার ব্যবহার করুন।
Custom Thread Pool প্রয়োজন হলে ব্যবহার করুন:
ForkJoinPoolকাস্টমাইজ করে কন্ট্রোল বাড়ানো যায়।
ForkJoinPool customPool = new ForkJoinPool(4); customPool.submit(() -> { IntStream.range(1, 10).parallel().forEach(System.out::println); }).join();
Java 8 এর Parallel Stream API ডেটা প্রক্রিয়াকরণের সময় সমান্তরাল প্রসেসিং সহজ এবং কার্যকরী করে। এটি বড় ডেটাসেট এবং CPU-বাউন্ড টাস্কগুলোর জন্য পারফেক্ট, তবে এর সীমাবদ্ধতাগুলো বুঝে সঠিকভাবে ব্যবহার করা উচিত। Parallel Stream ব্যবহার করলে প্রোগ্রামের কার্যকারিতা বাড়ানো সম্ভব, তবে ভুল ব্যবহারে পারফরম্যান্স কমে যেতে পারে।
Stream API এবং ForkJoinPool একত্রে ব্যবহার করলে জাভার মাল্টিপ্রসেসিং এবং প্যারালাল প্রোগ্রামিং আরও কার্যকর এবং সহজ হয়ে ওঠে।
Stream API এর ভূমিকা
Stream API জাভা ৮-এ যোগ করা হয়, যা ডেটা প্রক্রিয়াজাত করার জন্য একটি ডিক্লারেটিভ এবং কার্যকরী পদ্ধতি সরবরাহ করে। এটি লেজি-ইভালুয়েশন মডেল এবং প্যারালাল প্রসেসিং সমর্থন করে।
ForkJoinPool এর ভূমিকা
ForkJoinPool একটি বিশেষ ধরনের ExecutorService, যা টাস্কগুলোকে ছোট ছোট সাবটাস্কে বিভক্ত করে প্যারালাল প্রক্রিয়াজাত করতে ব্যবহার করা হয়। Stream API এর parallel() মেথডের মাধ্যমে এটি ব্যাকগ্রাউন্ডে কাজ করে।
Stream API এবং ForkJoinPool এর Integration
১. Sequential Stream vs Parallel Stream
Stream API স্বাভাবিকভাবে sequential হয়। যখন parallel() ব্যবহার করা হয়, এটি ForkJoinPool ব্যবহার করে প্যারালাল প্রসেসিং করে।
উদাহরণ:
import java.util.stream.IntStream;
public class StreamWithForkJoin {
public static void main(String[] args) {
// Sequential Stream
System.out.println("Sequential Stream:");
IntStream.range(1, 10)
.forEach(i -> System.out.println(Thread.currentThread().getName() + " - " + i));
// Parallel Stream
System.out.println("\nParallel Stream:");
IntStream.range(1, 10).parallel()
.forEach(i -> System.out.println(Thread.currentThread().getName() + " - " + i));
}
}
ফলাফল:
- Sequential Stream: সব কাজ এক থ্রেডে সম্পন্ন হয়।
- Parallel Stream: কাজগুলো বিভিন্ন থ্রেডে বিভক্ত হয়।
২. ForkJoinPool ব্যবহার কাস্টমাইজ করা
Stream API ডিফল্টভাবে ForkJoinPool.commonPool() ব্যবহার করে। কিন্তু, কাস্টম ForkJoinPool ব্যবহার করাও সম্ভব।
কোড উদাহরণ:
import java.util.concurrent.ForkJoinPool;
import java.util.stream.IntStream;
public class CustomForkJoinPool {
public static void main(String[] args) {
ForkJoinPool customPool = new ForkJoinPool(4); // 4 থ্রেডের কাস্টম পুল
try {
customPool.submit(() -> {
IntStream.range(1, 10).parallel()
.forEach(i -> System.out.println(Thread.currentThread().getName() + " - " + i));
}).get();
} catch (Exception e) {
e.printStackTrace();
} finally {
customPool.shutdown();
}
}
}
ব্যাখ্যা:
- ForkJoinPool.commonPool() ব্যবহার না করে একটি কাস্টম পুল ডিফাইন করা হয়েছে।
submit()মেথডের মাধ্যমে টাস্ক সাবমিট করা হয়।
৩. প্যারালাল প্রসেসিং এর জন্য Stream API এবং ForkJoinPool ব্যবহার
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class StreamForkJoinExample {
public static void main(String[] args) {
List<Integer> numbers = IntStream.range(1, 20).boxed().collect(Collectors.toList());
// Default ForkJoinPool (commonPool)
System.out.println("Default Parallel Stream:");
numbers.parallelStream()
.map(i -> i * i)
.forEach(i -> System.out.println(Thread.currentThread().getName() + " - " + i));
// Custom ForkJoinPool
ForkJoinPool customPool = new ForkJoinPool(3); // 3 থ্রেডের পুল
System.out.println("\nCustom ForkJoinPool:");
try {
customPool.submit(() -> {
numbers.parallelStream()
.map(i -> i * i)
.forEach(i -> System.out.println(Thread.currentThread().getName() + " - " + i));
}).get();
} catch (Exception e) {
e.printStackTrace();
} finally {
customPool.shutdown();
}
}
}
৪. Parallel Reduce অপারেশন
Stream API এবং ForkJoinPool ব্যবহার করে প্যারালাল প্রক্রিয়াজাতকরণে reduce() অপারেশন।
import java.util.stream.IntStream;
public class ParallelReduceExample {
public static void main(String[] args) {
int sum = IntStream.range(1, 100)
.parallel()
.reduce(0, Integer::sum);
System.out.println("Sum of numbers (Parallel Reduce): " + sum);
}
}
ForkJoinPool এর Behavior
- Recursive Task Division: ForkJoinPool কাজগুলোকে ছোট ছোট টাস্কে ভাগ করে (fork) এবং সেগুলো আলাদাভাবে সম্পন্ন করে (join)।
- Work Stealing: একটি থ্রেড যদি তার টাস্ক শেষ করে ফেলে, এটি অন্য থ্রেডের অসম্পূর্ণ টাস্ক নিয়ে কাজ করতে পারে।
- Common Pool:
Stream.parallel()ডিফল্ট ForkJoinPool ব্যবহার করে।
Stream API এবং ForkJoinPool Integration এর সুবিধা
- Simple Parallelism: সহজেই প্যারালাল প্রসেসিং ইমপ্লিমেন্ট করা যায়।
- Efficiency: ForkJoinPool এর মাধ্যমে কার্যকর প্যারালাল প্রসেসিং।
- Scalability: বড় ডেটাসেটের জন্য কার্যকর।
- Work Stealing Algorithm: থ্রেডগুলোর মধ্যে লোড ব্যালেন্সিং।
ব্যবহারিক উদাহরণ: বড় ডেটাসেট প্রসেসিং
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class LargeDatasetProcessing {
public static void main(String[] args) {
List<Integer> largeDataset = IntStream.range(1, 1_000_000).boxed().collect(Collectors.toList());
ForkJoinPool customPool = new ForkJoinPool(8); // 8 থ্রেডের পুল
try {
customPool.submit(() -> {
long count = largeDataset.parallelStream()
.filter(i -> i % 2 == 0)
.count();
System.out.println("Count of even numbers: " + count);
}).get();
} catch (Exception e) {
e.printStackTrace();
} finally {
customPool.shutdown();
}
}
}
Stream API এবং ForkJoinPool Integration এর সীমাবদ্ধতা
- Overhead: ছোট ডেটাসেটের জন্য প্যারালাল প্রসেসিং অপ্রয়োজনীয় ও ধীর হতে পারে।
- Thread Contention: অত্যধিক থ্রেড ব্যবহার করলে ডেটা রেস এবং কনটেনশন সমস্যা হতে পারে।
- Complex Debugging: প্যারালাল স্ট্রিমে ডিবাগিং করা কঠিন।
Stream API এবং ForkJoinPool একত্রে প্যারালাল প্রসেসিং সহজ এবং কার্যকর করে। বিশেষত, বড় ডেটাসেট বা কম্পিউটেশনাল কাজের জন্য এটি উপযুক্ত। তবে ডেটাসেটের আকার এবং থ্রেড ব্যবহারের সঠিক ভারসাম্য নিশ্চিত করা গুরুত্বপূর্ণ।
Parallel Streams জাভার Streams API এর একটি অংশ, যা Fork/Join Framework ব্যবহার করে ডেটা প্রসেসিংকে মাল্টিপ্রসেসর কোরে ভাগ করে। এটি বড় ডেটা সেটের জন্য পারফরম্যান্স উন্নত করে এবং প্রোগ্রামিংকে আরও কার্যকর ও সহজ করে তোলে।
Parallel Streams এর বৈশিষ্ট্য
- অ্যাসিঙ্ক্রোনাস প্রসেসিং: একাধিক থ্রেড ব্যবহার করে কাজ ভাগ করে।
- ডেটা প্যারালেলিজম: ডেটা অংশগুলো আলাদাভাবে প্রসেস হয়।
- ডিফল্ট থ্রেড পুল: JVM-এর common ForkJoinPool ব্যবহার করে, যা থ্রেড সংখ্যা কন্ট্রোল করে।
- সহজ সিনট্যাক্স: সাধারণ streams থেকে সহজেই প্যারালেল স্ট্রিমে রূপান্তর করা যায়।
Parallel Streams এর ব্যবহার
১. সাধারণ Parallel Streams উদাহরণ
import java.util.List;
import java.util.stream.IntStream;
public class ParallelStreamExample {
public static void main(String[] args) {
// বড় ডেটা সেট তৈরি
List<Integer> numbers = IntStream.range(1, 10001).boxed().toList();
// Sequential Stream
long startTime = System.currentTimeMillis();
numbers.stream().forEach(ParallelStreamExample::process);
long endTime = System.currentTimeMillis();
System.out.println("Sequential Stream Time: " + (endTime - startTime) + " ms");
// Parallel Stream
startTime = System.currentTimeMillis();
numbers.parallelStream().forEach(ParallelStreamExample::process);
endTime = System.currentTimeMillis();
System.out.println("Parallel Stream Time: " + (endTime - startTime) + " ms");
}
private static void process(Integer number) {
try {
Thread.sleep(1); // প্রতিটি আইটেম প্রক্রিয়া করতে সময় নেয়
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
আউটপুট:
Sequential Stream Time: 10000 ms
Parallel Stream Time: ~2000 ms
২. Parallel Streams এর মাধ্যমে ডেটা ফিল্টার ও ম্যাপ করা
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class ParallelStreamFilterExample {
public static void main(String[] args) {
List<Integer> numbers = IntStream.range(1, 10001).boxed().toList();
// Parallel Stream ব্যবহার করে ফিল্টার ও ম্যাপ করা
List<Integer> evenNumbers = numbers.parallelStream()
.filter(n -> n % 2 == 0) // শুধুমাত্র জোড় সংখ্যা ফিল্টার
.map(n -> n * n) // প্রতিটি সংখ্যা স্কয়ার
.collect(Collectors.toList());
System.out.println("Even Numbers Count: " + evenNumbers.size());
}
}
Parallel Streams এর সুবিধা
- পারফরম্যান্স বৃদ্ধি: বড় ডেটা সেট প্রক্রিয়াকরণের সময় কম লাগে।
- সহজ ব্যবহার: Streams API এর উপর ভিত্তি করে সহজেই ব্যবহার করা যায়।
- কোর ইউটিলাইজেশন: মাল্টি-কোর প্রসেসর ব্যবহার করে কাজ ভাগ করা হয়।
Parallel Streams এর সীমাবদ্ধতা
- থ্রেড-সেফটি: যদি ডেটা স্ট্রাকচার থ্রেড-সেফ না হয়, তাহলে ConcurrentModificationException হতে পারে।
- কিছু ছোট ডেটাসেটে ওভারহেড: Parallel Streams ছোট ডেটাসেটের জন্য ওভারহেড তৈরি করতে পারে।
- উন্নত নিয়ন্ত্রণের অভাব: থ্রেড সংখ্যা সরাসরি নিয়ন্ত্রণ করা সম্ভব নয়; JVM এটি পরিচালনা করে।
Parallel Streams ব্যবহার করার সময় সতর্কতা
১. সাইড এফেক্ট এড়ানো
যদি স্ট্রিমে কোনো সাইড এফেক্ট থাকে (যেমন ডেটা মডিফিকেশন), তাহলে Parallel Streams ব্যবহার না করাই ভালো।
// ভুল ব্যবহার: সাইড এফেক্ট সৃষ্টি করে
List<Integer> numbers = IntStream.range(1, 10001).boxed().toList();
List<Integer> results = new ArrayList<>();
numbers.parallelStream().forEach(n -> results.add(n * n)); // ConcurrentModificationException হতে পারে
২. সংক্ষিপ্ত প্রসেসিং
Parallel Streams শুধুমাত্র বড় ডেটাসেটের জন্য উপযোগী। ছোট ডেটাসেটের জন্য sequential() ব্যবহার করুন।
Parallel Streams বনাম Sequential Streams
| প্যারামিটার | Sequential Streams | Parallel Streams |
|---|---|---|
| প্রসেসিং মডেল | একক থ্রেড ব্যবহার করে। | একাধিক থ্রেড ব্যবহার করে। |
| পারফরম্যান্স | ছোট ডেটাসেটের জন্য কার্যকর। | বড় ডেটাসেটের জন্য কার্যকর। |
| সিনট্যাক্স | stream() ব্যবহার করে। | parallelStream() ব্যবহার করে। |
| থ্রেড কন্ট্রোল | কোনো থ্রেড ব্যবস্থাপনা নেই। | JVM স্বয়ংক্রিয়ভাবে থ্রেড নিয়ন্ত্রণ করে। |
Parallel Streams এর কাস্টম থ্রেড পুল ব্যবহার
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.IntStream;
public class CustomParallelStreamExample {
public static void main(String[] args) {
List<Integer> numbers = IntStream.range(1, 10001).boxed().toList();
ForkJoinPool customThreadPool = new ForkJoinPool(4); // কাস্টম থ্রেড পুল তৈরি
try {
customThreadPool.submit(() -> {
numbers.parallelStream().forEach(CustomParallelStreamExample::process);
}).get();
} catch (Exception e) {
e.printStackTrace();
} finally {
customThreadPool.shutdown();
}
}
private static void process(Integer number) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
- Parallel Streams বড় ডেটাসেট প্রক্রিয়াকরণের জন্য কার্যকর এবং মাল্টি-কোর প্রসেসর ব্যবহার করে পারফরম্যান্স উন্নত করে।
- ব্যবহারের সতর্কতা: সাইড এফেক্ট এবং ছোট ডেটাসেটের জন্য এটি সাবধানে ব্যবহার করা উচিত।
- কাস্টম থ্রেড পুল: আরও নিয়ন্ত্রণের জন্য কাস্টম থ্রেড পুল ব্যবহার করা যায়।
এই পদ্ধতি সঠিকভাবে ব্যবহার করলে ডেটা প্রসেসিং অনেক বেশি কার্যকর এবং দ্রুত করা সম্ভব।
জাভার Stream API ডেটা প্রসেসিংয়ের জন্য একটি শক্তিশালী টুল যা Sequential এবং Parallel Streams এ কাজ করতে পারে। উভয়ই ডেটা প্রসেসিংয়ে কার্যকর হলেও, তাদের ব্যবহার এবং পারফরম্যান্সে বড় পার্থক্য রয়েছে।
Sequential Stream
Sequential Stream ডেটা প্রসেসিং ধাপে ধাপে সম্পন্ন করে, যেখানে প্রতিটি ধাপ পরবর্তী ধাপ শুরু হওয়ার আগে শেষ হয়। এটি সিঙ্গেল থ্রেড ব্যবহার করে।
বৈশিষ্ট্য:
- সিঙ্গল থ্রেড: Sequential Stream সব অপারেশন একক থ্রেডে সম্পন্ন করে।
- অর্ডার মেইন্টেন: ডেটা প্রসেসিং সর্বদা উৎসের ক্রম অনুযায়ী হয়।
- কমপ্লেক্স নয়: সিম্পল এবং ডিবাগিং সহজ।
উদাহরণ:
import java.util.stream.IntStream;
public class SequentialStreamExample {
public static void main(String[] args) {
System.out.println("Sequential Stream:");
IntStream.range(1, 10)
.forEach(i -> System.out.println(Thread.currentThread().getName() + " : " + i));
}
}
আউটপুট (সম্ভাব্য):
Sequential Stream:
main : 1
main : 2
main : 3
...
main : 9
ব্যাখ্যা: main থ্রেড Sequential Stream এর সব কাজ সম্পন্ন করে।
Parallel Stream
Parallel Stream ডেটা প্রসেসিং মাল্টিপল থ্রেডে বিভক্ত করে। এটি ForkJoinPool ব্যবহার করে Parallel Processing পরিচালনা করে।
বৈশিষ্ট্য:
- মাল্টিপল থ্রেড: Parallel Stream কাজকে একাধিক থ্রেডে ভাগ করে।
- পারফরম্যান্স উন্নতি: বড় ডেটাসেটের জন্য দ্রুত কাজ সম্পন্ন করে।
- অর্ডার অপরিহার্য নয়: কিছু ক্ষেত্রে উৎস ক্রম মেইন্টেন না-ও হতে পারে।
উদাহরণ:
import java.util.stream.IntStream;
public class ParallelStreamExample {
public static void main(String[] args) {
System.out.println("Parallel Stream:");
IntStream.range(1, 10)
.parallel()
.forEach(i -> System.out.println(Thread.currentThread().getName() + " : " + i));
}
}
আউটপুট (সম্ভাব্য):
Parallel Stream:
ForkJoinPool.commonPool-worker-3 : 1
main : 2
ForkJoinPool.commonPool-worker-5 : 3
...
ব্যাখ্যা: একাধিক থ্রেড ব্যবহার করে কাজ দ্রুত সম্পন্ন হয়।
Sequential এবং Parallel Streams এর মধ্যে পার্থক্য
| বৈশিষ্ট্য | Sequential Stream | Parallel Stream |
|---|---|---|
| থ্রেড ব্যবহারে কৌশল | সিঙ্গল থ্রেড | মাল্টিপল থ্রেড |
| অর্ডার মেইন্টেন | উৎস ক্রম অনুযায়ী ডেটা প্রসেস করে | সবসময় উৎস ক্রম মেইন্টেন করে না |
| পারফরম্যান্স | ছোট ডেটাসেটের জন্য কার্যকর | বড় ডেটাসেটের জন্য কার্যকর |
| কোড সিম্প্লিসিটি | সহজ এবং ডিবাগিং সহজ | ডিবাগিং কিছুটা কঠিন |
| উদ্দেশ্য | ছোট ডেটাসেট এবং সিঙ্গেল-থ্রেড প্রসেসিং | বড় ডেটাসেট এবং মাল্টি-থ্রেড প্রসেসিং |
Sequential এবং Parallel Streams এর পারফরম্যান্স তুলনা
কোড: Sequential বনাম Parallel
import java.util.stream.LongStream;
public class StreamPerformanceComparison {
public static void main(String[] args) {
long n = 10_000_000;
// Sequential Stream
long startTime = System.currentTimeMillis();
long sequentialSum = LongStream.rangeClosed(1, n).sum();
long endTime = System.currentTimeMillis();
System.out.println("Sequential Stream Time: " + (endTime - startTime) + " ms");
// Parallel Stream
startTime = System.currentTimeMillis();
long parallelSum = LongStream.rangeClosed(1, n).parallel().sum();
endTime = System.currentTimeMillis();
System.out.println("Parallel Stream Time: " + (endTime - startTime) + " ms");
}
}
সম্ভাব্য আউটপুট:
Sequential Stream Time: 120 ms
Parallel Stream Time: 45 ms
বিশ্লেষণ:
- Parallel Stream বড় ডেটাসেটের জন্য দ্রুত কাজ সম্পন্ন করে।
- ছোট ডেটাসেটের জন্য Sequential Stream ভালো।
Best Practices
- ছোট ডেটাসেটের জন্য Sequential Stream ব্যবহার করুন।
- Parallel Stream ছোট ডেটাসেটে ওভারহেড তৈরি করে।
- Parallel Stream ব্যবহার করার আগে প্রসেসিং টাইম পরিমাপ করুন।
- Parallel Stream বড় ডেটাসেটের জন্যই কার্যকর।
সঠিক অর্ডার প্রয়োজন হলে Parallel Stream এ
forEachOrdered()ব্যবহার করুন।IntStream.range(1, 10) .parallel() .forEachOrdered(System.out::println);- স্টেটলেস অপারেশন নিশ্চিত করুন।
- Parallel Stream এ স্টেটফুল অপারেশন (যেমন গ্লোবাল ভেরিয়েবল পরিবর্তন) সমস্যা তৈরি করতে পারে।
Common Pitfalls
- Incorrect Use of Parallel Stream
- ছোট ডেটাসেট বা কমপ্লেক্স ডেটা প্রসেসিংয়ে Parallel Stream ওভারহেড তৈরি করে এবং পারফরম্যান্স কমায়।
- Unordered Processing
- Parallel Stream উৎস ক্রম বজায় না রাখলে ভুল ফলাফল হতে পারে।
Stateful Lambda Expression
- Parallel Stream এ স্টেটফুল ল্যাম্বডা এক্সপ্রেশন ডেটা ইনকনসিস্টেন্সি সৃষ্টি করতে পারে।
List<Integer> list = new ArrayList<>(); IntStream.range(1, 100).parallel().forEach(list::add); // Unsafe!- ForkJoinPool Overhead
- Parallel Stream অতিরিক্ত থ্রেড ব্যবহারে ForkJoinPool ওভারলোড হতে পারে।
| স্ট্রিম টাইপ | সেরা ব্যবহারের পরিস্থিতি |
|---|---|
| Sequential | ছোট ডেটাসেট, ডিবাগিং সহজ, এবং উৎস ক্রম প্রয়োজন। |
| Parallel | বড় ডেটাসেট এবং মাল্টি-থ্রেড প্রসেসিং যেখানে পারফরম্যান্স গুরুত্বপূর্ণ। |
Sequential এবং Parallel Streams সঠিকভাবে ব্যবহার করলে ডেটা প্রসেসিং আরও কার্যকর এবং কনকারেন্ট অ্যাপ্লিকেশন তৈরি সহজ হয়।
Read more